package drds.data_propagate.sink.entry;

import drds.common.$;
import drds.data_propagate.binlog_event.secondary.Entry;
import drds.data_propagate.binlog_event.secondary.EntryType;
import drds.data_propagate.binlog_event.secondary.position.SlaveInfo;
import drds.data_propagate.sink.AbstractEventListSink;
import drds.data_propagate.sink.EventDownStreamHandler;
import drds.data_propagate.sink.EventListSink;
import drds.data_propagate.sink.exception.SinkException;
import drds.data_propagate.store.Event;
import drds.data_propagate.store.EventStore;
import drds.data_propagate.store.MemoryEventStore;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

/**
 * mysql binlog数据对象输出
 */
@Slf4j
public class EntryEventListSink extends AbstractEventListSink<List<Entry>> implements EventListSink<List<Entry>> {


    private static final int maxFullTimes = 10;
    @Setter
    @Getter
    protected boolean notUseTransactionTimelineBarrier = false; // 是否需要尽可能过滤事务头/尾
    @Setter
    @Getter
    protected boolean ignoreEmtryTransactionEntry = true; // 是否需要过滤空的事务头/尾
    @Setter
    @Getter
    protected long emptyTransactionInterval = 5 * 1000; // 空的事务输出的频率
    @Setter
    @Getter
    protected long emptyTransctionThresold = 8192; // 超过8192个事务头，输出一个
    @Setter
    @Getter
    protected volatile long lastTransactionTimestamp = 0L;
    @Setter
    @Getter
    protected AtomicLong lastTransactionCount = new AtomicLong(0L);
    @Setter
    @Getter
    protected volatile long lastEmptyTransactionTimestamp = 0L;
    @Setter
    @Getter
    protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0L);
    @Setter
    @Getter
    protected AtomicLong eventsSinkBlockingTime = new AtomicLong(0L);
    @Setter
    @Getter
    protected boolean raw;
    @Setter
    @Getter
    private EventStore<Event> eventStore;

    public EntryEventListSink() {
        addHandler(new HeartBeatEntryEventHandler());
    }

    public void start() {
        super.start();


        if (eventStore instanceof MemoryEventStore) {
            this.raw = ((MemoryEventStore) eventStore).isRaw();
        }

        for (EventDownStreamHandler handler : getEventDownStreamHandlerList()) {
            if (!handler.isStart()) {
                handler.start();
            }
        }
    }

    public void stop() {
        super.stop();

        for (EventDownStreamHandler handler : getEventDownStreamHandlerList()) {
            if (handler.isStart()) {
                handler.stop();
            }
        }
    }

    public boolean filter(List<Entry> entryList, InetSocketAddress remoteAddress, String destination) {
        return false;
    }

    public boolean sink(List<Entry> entryList, InetSocketAddress remoteAddress, String destination)
            throws SinkException, InterruptedException {
        return sinkData(entryList, remoteAddress);
    }

    private boolean sinkData(List<Entry> entryList, InetSocketAddress remoteAddress) throws InterruptedException {
        boolean hasRowData = false;
        boolean hasHeartBeat = false;
        List<Event> eventList = new ArrayList<Event>();
        for (Entry entry : entryList) {
            if (!doFilter(entry)) {
                continue;
            }

            if (notUseTransactionTimelineBarrier && (//
                    entry.getEntryType() == EntryType.transaction_begin//
                            || entry.getEntryType() == EntryType.transaction_end)//
            ) {//
                long executeTime = entry.getEntryHeader().getExecuteTimestamp();
                // 基于一定的策略控制，放过空的事务头和尾，便于及时更新数据库位点，表明工作正常
                if (lastTransactionCount.incrementAndGet() <= emptyTransctionThresold//
                        &&//
                        Math.abs(executeTime - lastTransactionTimestamp) <= emptyTransactionInterval) {//
                    continue;//不再增加到eventList
                } else {
                    lastTransactionCount.set(0L);
                    lastTransactionTimestamp = executeTime;
                }
            }

            hasRowData |= (entry.getEntryType() == EntryType.general_data);
            hasHeartBeat |= (entry.getEntryType() == EntryType.heartbeat);
            Event event = new Event(new SlaveInfo(remoteAddress, -1L), entry, raw);
            eventList.add(event);
        }
        //就这五种
//                transaction_begin(0, 1),
//                general_data(1, 2),
//                transaction_end(2, 3),
        //
//                heartbeat(3, 4),//心跳类型，内部使用，外部暂不可见，可忽略 *
        //???
//                gtid_log(4, 5)


        if (hasRowData || hasHeartBeat) {
            // 存在row记录 或者 存在heartbeat记录，直接跳给后续处理
            return doSink(eventList);
        } else {

            // EntryType.transaction_begin || EntryType.transaction_end ||EntryType.gtid三种
            // 数量不够时间才凑,还不够HeartBeat来凑
            if (ignoreEmtryTransactionEntry && $.isHasData(eventList)) {
                long executeTime = eventList.get(0).getExecuteTime();
                // 基于一定的策略控制，放过空的事务头和尾，便于及时更新数据库位点，表明工作正常
                if (Math.abs(executeTime - lastEmptyTransactionTimestamp) > emptyTransactionInterval//
                        || //
                        lastEmptyTransactionCount.incrementAndGet() > emptyTransctionThresold) {//
                    lastEmptyTransactionCount.set(0L);
                    lastEmptyTransactionTimestamp = executeTime;
                    return doSink(eventList);
                } else {
                    //...
                }
            }
            // 直接返回true，忽略空的事务头和尾
            return true;
        }
    }

    protected boolean doFilter(Entry entry) {
        if (binlogEventFilter != null && entry.getEntryType() == EntryType.general_data) {
            String name = getSchemaNameAndTableName(entry);
            boolean need = binlogEventFilter.filter(name);
            if (!need) {
                log.debug("binlogEventFilter taskId[{}] secondary : {}:{}", name, entry.getEntryHeader().getBinlogFileName(),
                        entry.getEntryHeader().getBinlogFileOffset());
            }

            return need;
        } else {
            return true;
        }
    }

    protected boolean doSink(List<Event> eventList) {
        for (EventDownStreamHandler<List<Event>> handler : getEventDownStreamHandlerList()) {
            eventList = handler.before(eventList);
        }
        long blockingStart = 0L;
        int fullTimes = 0;
        do {
            if (eventStore.tryPutOneTime(eventList)) {
                if (fullTimes > 0) {
                    eventsSinkBlockingTime.addAndGet(System.nanoTime() - blockingStart);
                }
                for (EventDownStreamHandler<List<Event>> handler : getEventDownStreamHandlerList()) {
                    eventList = handler.after(eventList);
                }
                return true;
            } else {
                if (fullTimes == 0) {
                    blockingStart = System.nanoTime();
                }
                applyWait(++fullTimes);
                if (fullTimes % 100 == 0) {
                    long nextStart = System.nanoTime();
                    eventsSinkBlockingTime.addAndGet(nextStart - blockingStart);
                    blockingStart = nextStart;
                }
            }

            for (EventDownStreamHandler<List<Event>> handler : getEventDownStreamHandlerList()) {
                eventList = handler.retry(eventList);
            }

        } while (running && !Thread.interrupted());
        return false;
    }

    // 处理无数据的情况，避免空循环挂死
    private void applyWait(int fullTimes) {
        int newFullTimes = fullTimes > maxFullTimes ? maxFullTimes : fullTimes;
        if (fullTimes <= 3) { // 3次以内
            Thread.yield();
        } else { // 超过3次，最多只sleep 10ms
            LockSupport.parkNanos(1000 * 1000L * newFullTimes);
        }

    }

    private String getSchemaNameAndTableName(Entry entry) {
        return entry.getEntryHeader().getSchemaName() + "." + entry.getEntryHeader().getTableName();
    }

}
